In [85]:
"""
Script for parsing logs
"""

import os
import sys
sys.path.append(os.path.abspath('../utility'))

import yaml
import json
import pprint
import tabulate
import humanize
import datetime

import numpy as np
import pandas as pd
import statsmodels.api as sm
from scipy.interpolate import interp1d

import seaborn as sb
import matplotlib.cm as cm
import matplotlib.pyplot as plt

from const import *
from collections import OrderedDict

## Seaborn aesthetics
sb.set_context(
    "talk",
    font_scale=1,
    rc={
        "lines.linewidth": 1,
        "text.usetex": True,
        "font.family": 'serif',
        "font.serif": 'Palatino',
        "font.size": 16
    })
sb.set_style("whitegrid")

stats = {}
link_stats = {}
cloud_stats = {}
latency = None
In [86]:
"""
Non-Cloud Simulation logs
"""

# log_path = "../logs/march"
# log_path += "20190316-2030" ## Two Child - Wait Policy (60 seconds)
# log_path += "20190316-2029" ## Three Child - Wait Policy (60 seconds)

# log_path += "20190316-2244" ## Two Child - Wait Policy (120 seconds)
# log_path += "20190316-2243" ## Three Child - Wait Policy (120 seconds)

# log_path += "20190316-2316" ## Two Child - Simple Policy
# log_path += "20190316-2317" ## Three Child - Simple Policy

# log_path += "20190317-0013" ## Two Child - Time Policy
# log_path += "20190317-0011" ## Three Child - Time Policy

# log_path += "20190317-0058" ## Two Child - Accuracy Policy
# log_path += "20190317-0059" ## Three Child - Accuracy Policy

# log_path += "20190317-1544-five-child-simple" ## Five Child - Simple
# log_path += "20190317-1545-seven-child-simple" ## Seven Child - Simple

# log_path += "20190317-1625-1-2-4-simple"
# log_path += "20190317-1802-1-2-4-simple"

# log_path += "20190319-2032-1-2-4-simple/"
# log_path += "20190319-2035-seven-child-simple/"
# log_path += "20190319-2259-five-child-simple/"
# log_path += "20190319-2251-seven-child-simple/"
# log_path += "20190320-0130-five-child-simple"
# log_path += "20190320-0133-five-child-simple-1"
Out[86]:
'\nNon-Cloud Simulation logs\n'
In [87]:
"""
Cloud Simulation logs
"""

log_path = "../logs/"

# log_path += "20190329-0121-two-child"
# log_path += "20190329-0152-three-child"
# log_path += "20190329-0254-five-child"
# log_path += "20190329-0256-seven-child"
# log_path += "20190329-1625-five-child-simple"
# log_path += "20190329-1626-seven-child-simple"
# log_path += "20190329-1629-three-child-simple"
# log_path += "20190329-1844-1-2-4-simple"
# log_path += "20190329-1950-1-2-4-simple"
# log_path += "20190925-0110-testing-ten-ada-medium-unbiased"
# log_path += "20190930-2200-testing-ten-biased"
# log_path += "20190930-2222-testing-ten-biased"
# log_path += "20190930-2256-testing-ten-biased"
# log_path += "20190930-2305-testing-ten-biased"
# log_path += "20191001-2359-testing-124-biased"
log_path += "20191006-2333-biased-1-3-10-aniket-params"
In [88]:
"""
Set Log file and config file
"""

log_file, config_file = None, None
try:
    for file in os.listdir(log_path):
        if file.endswith(".log"): log_file = os.path.join(log_path, file)
        if file.endswith(".yaml"): config_file = os.path.join(log_path, file)
except FileNotFoundError:
    print("Log Directory not found"); exit(0)

if log_file is None or config_file is None:
    print("Invalid Log directory");	exit(0)
In [89]:
"""
Read yaml from config file
"""

with open(config_file, 'r') as f:

    raw_data = yaml.load(f.read())
    tree = {}

    for x in raw_data['nodes']:
        tree[x['id']] = x
        tree[x['id']]['is_worker'] = True
        tree[x['id']]['children_count'] = 0

    for x in raw_data['nodes']:

        if 'parent_id' in x:
            tree[x['parent_id']]['is_worker'] = False
            tree[x['parent_id']]['children_count'] += 1
        else:
            tree[x['id']]['parent_id'] = -1
    
    tree[-1] = raw_data['cloud']
    tree[-1]['parent_id'] = -1
    tree[-1]['is_worker'] = False

config = tree
/home/mayank/.local/lib/python3.5/site-packages/ipykernel_launcher.py:7: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.
  import sys
In [90]:
"""
Collect data
"""

f = open(log_file, 'r')

latency = {}
for node_id in config:
    if not config[node_id]['is_worker']:
        latency.update({node_id:OrderedDict()})
window_lags_pointer = { node_id:0 for node_id in config }
base_timestamp, final_timestamp = None, None

for row in f:

    data = json.loads(row)
    node_id, payload, timestamp = data[NODE_ID], data[PAYLOAD], data[TIMESTAMP]
    parent_id = config[node_id]['parent_id']

    if base_timestamp is None: base_timestamp = timestamp
    final_timestamp = timestamp

    if data[TYPE] == STATISTIC:

        ## Calculate Accuracy and timestamp
        if node_id in stats: 
            stats[node_id][TIMESTAMP].append(timestamp - base_timestamp)
            try:
                for test_file in payload[ACCURACY]:
                    stats[node_id][ACCURACY][test_file].append(payload[ACCURACY][test_file])
            except:
                for test_file in payload[POST_MERGE_ACCURACY]:
                    stats[node_id][ACCURACY][test_file].append(payload[POST_MERGE_ACCURACY][test_file])
        else:
            try:
                stats[node_id] = {
                    ACCURACY 		: { test_file:[a] for test_file,a in payload[ACCURACY].items() },
                    TIMESTAMP 		: [timestamp - base_timestamp]
                }
            except:
                stats[node_id] = {
                    ACCURACY 		: { test_file:[a] for test_file,a in payload[POST_MERGE_ACCURACY].items() },
                    TIMESTAMP 		: [timestamp - base_timestamp]
                }

        ##Calculate Runtime, Process time and total datapoints (if node is a worker)
        if config[node_id]['is_worker']:
            if RUNTIME in stats[node_id]:
                stats[node_id][RUNTIME] += payload[RUNTIME]
                stats[node_id][PROCESS_TIME] += payload[PROCESS_TIME] 
                stats[node_id][DATAPOINTS] += payload[DATAPOINTS]
            else:
                stats[node_id][RUNTIME] = payload[RUNTIME]
                stats[node_id][PROCESS_TIME] =  payload[PROCESS_TIME]
                stats[node_id][DATAPOINTS] = payload[DATAPOINTS]

    elif data[TYPE] == PROCESSED:		
        ## Calculate accuracy lag - Part 1 (Store window processing timestamp)
        if config[node_id]['is_worker']:
            ## Log which indicates window is processed
            window_id = payload[WINDOW_ID]

            if window_id not in latency[parent_id]:
                latency[parent_id][window_id] = { 
                    node_id: timestamp 
                }
            else:
                latency[parent_id][window_id].update({node_id:timestamp})
                
    elif data[TYPE] == PUSHED:
        ## Calculate network cost
        if NETWORK_COST in payload:
            if (parent_id, node_id) in link_stats:
                link_stats[(parent_id, node_id)] += payload[NETWORK_COST]
            else:
                link_stats[(parent_id, node_id)] = payload[NETWORK_COST]

    elif data[TYPE] == PULLED:
        
        ## Calculate network cost
        if NETWORK_COST in payload:
            if (parent_id, node_id) in link_stats:
                link_stats[(parent_id, node_id)] += payload[NETWORK_COST]
            else:
                link_stats[(parent_id, node_id)] = payload[NETWORK_COST]
                
        ## Calculate accuracy lag - Part 2 (Receive model from parent)
        if config[node_id]['is_worker']:
            ## Log which indicates model receival
            lag_pointer = window_lags_pointer[node_id]
            if lag_pointer in latency[parent_id]:
                while lag_pointer in latency[parent_id] and len(latency[parent_id][lag_pointer]) == config[parent_id]['children_count']:
                    latency[parent_id][lag_pointer][node_id] = timestamp - latency[parent_id][lag_pointer][node_id]
                    lag_pointer += 1
                window_lags_pointer[node_id] = lag_pointer
    
    elif data[TYPE] == CLSTATISTIC:
        
        ## Cloud Statistic
        query_node = payload[NODE_ID]
        if query_node not in config: continue
        if query_node in cloud_stats:
            cloud_stats[query_node][TIMESTAMP].append(timestamp - base_timestamp)
            for test_file in payload[ACCURACY]:
                cloud_stats[query_node][ACCURACY][test_file].append(payload[ACCURACY][test_file])
        else:
            cloud_stats[query_node] = {
                ACCURACY 		: { test_file:[a] for test_file,a in payload[ACCURACY].items() },
                TIMESTAMP 		: [timestamp - base_timestamp]
            }
            

num_nodes = len(stats)
num_workers = sum([1 if v['is_worker'] else 0 for k,v in config.items()])
f.close()

Experiment Configuration

In [91]:
"""
Print yaml
"""

with open(config_file, 'r') as f:
    print(yaml.dump(yaml.load(f.read()), default_flow_style=False))
application_arguments:
- alpha: 0.4
  batch_size: 20
  epochs: 10
  hidden_layer_sizes:
  - 20
  - 10
  - 10
  input_size: 784
  model: MNIST
  num_hidden_layers: 3
  output_size: 10
bandwidths:
- bandwidth: 1000000
  dest_id: 2
  src_id: 1
cloud:
  cpus: 12
  machine: 0
  memory: 2048M
  port: 9040
default_args: 0
default_bandwidth: 1000000
default_cpus: 1
default_docker_image: aniketshirke/distributedlearning:simulator
default_host_test_directory: ~/Simulator/mnist_data/test
default_kafka_server: 10.2.32.91:9092
default_memory: 500M
default_policy: 0
default_test_directory: /TreeNN/data/
default_window_interval: 42
default_window_limit: 200
machine:
- ip: 10.129.26.111
  password: spark!sfun
  username: ub
nodes:
- cpus: 2
  id: 1
  machine: 0
  memory: 1024M
  port: 8040
- id: 2
  machine: 0
  parent_id: 1
  port: 8041
- id: 3
  machine: 0
  parent_id: 1
  port: 8042
- id: 4
  machine: 0
  parent_id: 1
  port: 8043
- id: 5
  machine: 0
  parent_id: 2
  port: 8044
  sensors:
  - 30
- id: 6
  machine: 0
  parent_id: 2
  port: 8045
  sensors:
  - 31
- id: 7
  machine: 0
  parent_id: 2
  port: 8046
  sensors:
  - 32
- id: 8
  machine: 0
  parent_id: 2
  port: 8047
  sensors:
  - 33
- id: 9
  machine: 0
  parent_id: 3
  port: 8048
  sensors:
  - 34
- id: 10
  machine: 0
  parent_id: 3
  port: 8049
  sensors:
  - 35
- id: 11
  machine: 0
  parent_id: 3
  port: 8050
  sensors:
  - 36
- id: 12
  machine: 0
  parent_id: 4
  port: 8051
  sensors:
  - 37
- id: 13
  machine: 0
  parent_id: 4
  port: 8052
  sensors:
  - 38
- id: 14
  machine: 0
  parent_id: 4
  port: 8053
  sensors:
  - 39
policy:
- args: null
  type: SimplePolicy
- args:
    pull_interval: 120
    push_interval: 120
  type: TimePolicy
- args:
    pull_count: 400
    push_count: 1
  type: WindowPolicy
- args:
    threshold: 2
  type: AccuracyPolicy

/home/mayank/.local/lib/python3.5/site-packages/ipykernel_launcher.py:6: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.
  

Experiment Information

In [92]:
"""
Print Duration
"""
from IPython.display import Markdown as md

experiment_duration = humanize.naturaldelta(datetime.timedelta(seconds=final_timestamp - base_timestamp))

md("Experiment duration: %s"%experiment_duration)
Out[92]:

Experiment duration: 30 minutes

In [93]:
"""
Print Statistics
"""
from IPython.display import display, HTML

exp_info = pd.DataFrame(columns=['Runtime', 'Process time', 'Average Datapoints per Window'], index=stats.keys())

for node_id, info in stats.items():
    if config[node_id]['is_worker']:
        runtime = humanize.naturaldelta(datetime.timedelta(seconds=info[RUNTIME]))
        process_time = humanize.naturaldelta(datetime.timedelta(seconds=info[PROCESS_TIME]))
        datapoints = info[DATAPOINTS]
        exp_info.loc[node_id] = [runtime, process_time, datapoints/len(info)]
exp_info.index.name='Node ID'        

exp_info.fillna(value='-', inplace=True)
display(HTML(exp_info.to_html()))
Runtime Process time Average Datapoints per Window
Node ID
1 - - -
2 - - -
3 - - -
4 - - -
5 24 minutes 5 minutes 1185.2
6 24 minutes 5 minutes 1190.6
7 24 minutes 5 minutes 1169
8 23 minutes 5 minutes 1163.2
9 24 minutes 6 minutes 1196.8
10 23 minutes 5 minutes 1155.2
11 23 minutes 5 minutes 1160.6
12 24 minutes 5 minutes 1159.8
13 24 minutes 6 minutes 1174.2
14 24 minutes 6 minutes 1178.4
-1 - - -

Network Cost

In [94]:
"""
Print Link Statistics
"""
from IPython.display import display, HTML

link_matrix = pd.DataFrame(np.empty(shape=(num_nodes,num_nodes), dtype=object), columns=stats.keys(), index=stats.keys())

for (s,d) in link_stats:
    link_matrix.at[s,d] = humanize.naturalsize(link_stats[(s,d)], gnu=True)
    link_matrix.at[d,s] = humanize.naturalsize(link_stats[(s,d)], gnu=True)

link_matrix.fillna(value='-', inplace=True)

# display(HTML(link_matrix.to_html()))
display(HTML(link_matrix.to_html()))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 -1
1 - 25.1M 18.7M 18.5M - - - - - - - - - - -
2 25.1M - - - 6.4M 6.4M 6.2M 6.2M - - - - - - -
3 18.7M - - - - - - - 6.4M 6.2M 6.2M - - - -
4 18.5M - - - - - - - - - - 6.2M 6.2M 6.2M -
5 - 6.4M - - - - - - - - - - - - -
6 - 6.4M - - - - - - - - - - - - -
7 - 6.2M - - - - - - - - - - - - -
8 - 6.2M - - - - - - - - - - - - -
9 - - 6.4M - - - - - - - - - - - -
10 - - 6.2M - - - - - - - - - - - -
11 - - 6.2M - - - - - - - - - - - -
12 - - - 6.2M - - - - - - - - - - -
13 - - - 6.2M - - - - - - - - - - -
14 - - - 6.2M - - - - - - - - - - -
-1 - - - - - - - - - - - - - - -

Model Accuracy for each Node

In [95]:
"""
Plot Accuracy
"""

smooth_accuracy = {node_id: {} for node_id in stats}
base_colors = cm.rainbow(np.linspace(0, 1, num_workers))

for node_id, stat in stats.items():
    
    ## Init plot and colormap
    plt.figure(figsize=(12, 8))
    colors = [base_colors[i%num_workers] for i in range(len(stat[ACCURACY]))]
    #colors = cm.Accent(np.linspace(0, 1, len(stat[ACCURACY])))
    
    ## Plotting
    files = OrderedDict(sorted(stat[ACCURACY].items()))    
    for f,c in zip(files,colors):
        if len(stat[ACCURACY][f]) < 2: continue
        ## Scatterplot in the background
        ax = sb.scatterplot(np.array(stat[TIMESTAMP]),np.array(stat[ACCURACY][f]),color=c)
        
        ## Smooth fit to the data
        lowess = sm.nonparametric.lowess(np.array(stat[ACCURACY][f]), np.array(stat[TIMESTAMP]), frac=0.4)
        [lowess_x, lowess_y] = list(zip(*lowess))
        plt.plot(lowess_x, lowess_y, color=c, label=f)
        
        smooth_accuracy[node_id][f] = interp1d(lowess_x, lowess_y, bounds_error=False)

    ## Axes aesthetics
    ax.tick_params(direction='in', length=6, width=2, colors='k', which='major')
    ax.tick_params(direction='in', length=4, width=1, colors='r', which='minor')
    ax.minorticks_on() 
    sb.despine()
    
    plt.title('Accuracy plot for Node %d'%node_id)
    plt.xlabel('Seconds')
    plt.ylabel('Accuracy')
    plt.xlim(0)
    plt.ylim(-5,100)
    plt.legend(ncol=2, loc='lower right')
    
    plt.show()
    #plt.clf()

Model Accuracy for Cloud

In [96]:
"""
Plot Accuracy for cloud
"""

stat = {ACCURACY: {f:[] for f in cloud_stats[1][ACCURACY]}, TIMESTAMP: []}
for query_node in cloud_stats:
    for f in cloud_stats[query_node][ACCURACY]:
        stat[ACCURACY][f].extend(cloud_stats[query_node][ACCURACY][f])
    stat[TIMESTAMP].extend(cloud_stats[query_node][TIMESTAMP])
    
## Init plot and colormap
plt.figure(figsize=(12, 8))
colors = [base_colors[i%num_workers] for i in range(len(stat[ACCURACY]))]
#colors = cm.Accent(np.linspace(0, 1, len(stat[ACCURACY])))

## Plotting
files = OrderedDict(sorted(stat[ACCURACY].items()))    
for f,c in zip(files,colors):

    ## Scatterplot in the background
    ax = sb.scatterplot(np.array(stat[TIMESTAMP]),np.array(stat[ACCURACY][f]),color=c)

    ## Smooth fit to the data
    lowess = sm.nonparametric.lowess(np.array(stat[ACCURACY][f]), np.array(stat[TIMESTAMP]), frac=0.4)
    [lowess_x, lowess_y] = list(zip(*lowess))
    plt.plot(lowess_x, lowess_y, color=c, label=f)

## Axes aesthetics
ax.tick_params(direction='in', length=6, width=2, colors='k', which='major')
ax.tick_params(direction='in', length=4, width=1, colors='r', which='minor')
ax.minorticks_on() 
sb.despine()

plt.title('Accuracy plot for Cloud')
plt.xlabel('Seconds')
plt.ylabel('Accuracy')
plt.xlim(0)
plt.ylim(0,100)
plt.legend(ncol=2, loc='lower right')

plt.show()
#plt.clf()

Accuracy Lag for each Node

In [97]:
"""
Plot Accuracy Lag
"""

accuracy_lag = {query_node: None for query_node in cloud_stats}

for query_node in cloud_stats:
    cl_accuracy, cl_timestamp = cloud_stats[query_node][ACCURACY], cloud_stats[query_node][TIMESTAMP]
    accuracy = { f : smooth_accuracy[query_node][f](cl_timestamp) for f in cl_accuracy if len(stats[query_node][ACCURACY][f]) > 2}
    accuracy_lag[query_node] = {f : [ cla - a for cla,a in zip(cl_accuracy[f], accuracy[f])] for f in cl_accuracy if len(stats[query_node][ACCURACY][f]) > 2}

for query_node, lag in accuracy_lag.items():
    
    ## Init plot and colormap
    plt.figure(figsize=(12, 8))
    colors = [base_colors[i%num_workers] for i in range(len(lag))]
    
    cl_timestamp = cloud_stats[query_node][TIMESTAMP]
    ## Plotting
    files = OrderedDict(sorted(lag.items()))    
    for f,c in zip(files,colors):
        if len(stats[query_node][ACCURACY][f]) < 2: continue
        ## Scatterplot in the background
        ax = sb.scatterplot(np.array(cl_timestamp),np.array(lag[f]),color=c)
        
        ## Smooth fit to the data
        lowess = sm.nonparametric.lowess(np.array(lag[f]), np.array(cl_timestamp), frac=0.4)
        [lowess_x, lowess_y] = list(zip(*lowess))
        plt.plot(lowess_x, lowess_y, color=c, label=f)
        
    ## Axes aesthetics
    ax.tick_params(direction='in', length=6, width=2, colors='k', which='major')
    ax.tick_params(direction='in', length=4, width=1, colors='r', which='minor')
    ax.minorticks_on() 
    sb.despine()
    
    plt.title('Accuracy Lag for Node %d'%query_node)
    plt.xlabel('Seconds')
    plt.ylabel('Accuracy Lag')
    plt.xlim(0)
    plt.ylim(-100,100)
    plt.legend(ncol=2, loc='lower right')
    
    plt.show()
    #plt.clf()

Latency for the configuration

In [98]:
"""
Plot Latency
"""

## Reshape dictionary for visualization and filter out odd values of latency (at the end)
modified_latency = {}
for node_id in config: 
    if config[node_id]['is_worker']:
        modified_latency.update({node_id: []})

for parent_id in latency:
    for window_id in latency[parent_id]:
        for node_id in latency[parent_id][window_id]:
            lag = latency[parent_id][window_id][node_id]
            if lag < base_timestamp:
                modified_latency[node_id].append((window_id, lag))


## Graph colors
plt.figure(figsize=(12, 8))
base_colors = cm.rainbow(np.linspace(0, 1,len(modified_latency))) #For the case of MNIST and one level of heirarchy
colors = dict(list(zip(modified_latency.keys(), base_colors)))

for node_id in modified_latency:
    if len(modified_latency[node_id]) == 0: continue
    [window_ids, lags] = zip(*modified_latency[node_id])
    window_ids, lags = list(window_ids), list(lags)
    
    ## Scatterplot in the background
    ax = sb.scatterplot(window_ids,lags, color=colors[node_id])
    
    ## Smooth fit to the data
    lowess = sm.nonparametric.lowess(lags, window_ids, frac=0.4)
    [lowess_x, lowess_y] = list(zip(*lowess))
    plt.plot(lowess_x, lowess_y, color=colors[node_id], label=node_id)
    
## Axes aesthetics
ax.tick_params(direction='in', length=6, width=2, colors='k', which='major')
ax.tick_params(direction='in', length=4, width=1, colors='r', which='minor')
ax.minorticks_on() 
sb.despine()

plt.title('Latency for the configuration')
plt.xlabel('Window ID')
plt.ylabel('Latency (in seconds)')
plt.xlim(0)
plt.legend()
plt.show()
# plt.clf()
In [ ]: